package rabbitmq

import (
	"encoding/json"
	"github.com/nsqio/go-nsq"
	"github.com/streadway/amqp"
	utils "snow-im/app/utils"
	"snow-im/config"
	"time"
)

var (
	channel  = "test-channel"
	producer *nsq.Producer
)

type NsqMq struct {
	address map[string]string
}

func (n NsqMq) initProducer(str []string) *nsq.Producer {
	cfg := nsq.NewConfig()
	cfg.HeartbeatInterval = 10 * time.Second
	producer, err := nsq.NewProducer(str[0], cfg)
	utils.Log(nil, "nsq url", str[0], err)
	if err != nil {
		utils.Log(nil, "produce err", err)
	}
	return producer
}
func (n NsqMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
	name = config.GetConf().AppName + "_" + name
	if producer == nil {
		producer = n.initProducer(config.GetConf().NsqHost)
	}
	var content, err = json.Marshal(log)
	if err != nil { //不能发布空串，否则会导致error
		utils.Log(nil, "produce err", err)
	}
	if delayTime < 1 {
		err = producer.Publish(name, content) // 发布消息
		if err != nil {
			producer.Stop()
			producer = nil
			n.Produce(name, log, delayTime, args)
		}
	} else {
		err = producer.DeferredPublish(name, time.Duration(delayTime)*time.Second, content) // 发布消息
		if err != nil {
			producer.Stop()
			producer = nil
			n.Produce(name, log, delayTime, args)
		}
	}

	return err
}

// 消费者
type ConsumerT struct {
	hand func(tag uint64, ch *amqp.Channel, msg []byte)
}

func (this *ConsumerT) HandleMessage(msg *nsq.Message) error {
	//utils.Log(nil,"receive", msg.NSQDAddress, "message:", string(msg.Body))
	this.hand(0, nil, msg.Body)
	return nil
}
func (n NsqMq) Consume(name string, hand interface{}) {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = time.Second         //设置重连时间
	c, err := nsq.NewConsumer(name, channel, cfg) // 新建一个消费者
	if err != nil {
		utils.Log(nil, "nsq comume err")
	}
	c.SetLogger(nil, 0)                                                                   //屏蔽系统日志
	c.AddHandler(&ConsumerT{hand: hand.(func(tag uint64, ch *amqp.Channel, msg []byte))}) // 添加消费者接口

	//建立NSQLookupd连接
	if err := c.ConnectToNSQLookupd(config.GetConf().NsqConsumer); err != nil {
		utils.Log(nil, "nsq comume err")
	}

	//建立多个nsqd连接
	if err := c.ConnectToNSQDs(config.GetConf().NsqHost); err != nil {
		utils.Log(nil, "nsq comsumer err", err)
	}

	// 建立一个nsqd连接
	// if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
	//  panic(err)
	// }
	select {}
}
